Skip to content

Conversation

garyrussell
Copy link
Contributor

Initial PoC only.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple concerns from me.

class Listener {

@KafkaListener(topics = "skReactorTopic")
public Disposable listen(Flux<ReceiverRecord<String, String>> flux) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the return type must be Mono<Void> to postpone a subscription to the container.
Similar way what we have with WebFlux and RSocket implementations in Spring.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree with Artem that it would be better to return Mono<Void> here

@artembilan
Copy link
Member

@bsideup,

We would appreciate to have your feedback on this one since you are familiar with Reactor Kafka.

Thank you in advance!


@Override
public void stop() {
if (this.disposable != null && !this.disposable.isDisposed()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. no need to check for isDisposed, it should be idempotent
  2. hint: you can pre-initialize the field with something like static final Disposable DISPOSED = Disposable.disposed() to avoid the null checks

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple concerns so far.

import org.springframework.util.StringUtils;
import org.springframework.validation.Validator;

import reactor.core.publisher.Flux;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this Reactor becomes as not a optional dependency any more...
Is it really what we are aiming now ?

public void start() {
ReceiverOptions<?, ?> options = ReceiverOptions.create(this.configs)
.subscription(this.topics);
Flux<?> flux = KafkaReceiver.create(options).receive();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything wrong with using ReactiveKafkaConsumerTemplate instead?

@garyrussell
Copy link
Contributor Author

Closing for now.

@garyrussell garyrussell added this to the Backlog milestone Aug 14, 2019
@hartmut-co-uk
Copy link

Hi, has there been issues with the implementation so far - or has it simply been de-scoped for now?

@garyrussell
Copy link
Contributor Author

We just haven't had the bandwidth to work on it; we have also discovered some problems with the reactor-kafka project with respect to producer fencing when transactions are being used. That issue needs to be addressed over there; I hope to find some time to look at it soon.

@garyrussell
Copy link
Contributor Author

Closing for now.

@roger751
Copy link

@garyrussell can you update the progress about this subject? I have some questions which I can't find answers to:

  1. What were the problems with Reactor-Kafka and transactions?
  2. Have they been solved?
  3. Is there any alternative to this subject or does it still in plans for being implemented?

@garyrussell
Copy link
Contributor Author

  1. The problem is around producer fencing - when using EOS mode ALPHA (now called V1), the transactional id needs to be based on the group/topic/partition; currently it processes the entire ConsumerRecords in a single transaction and the records might be from different partitions.
  2. It is solved by using EOS mode BETA (now called V2), but that needs some work Add Support for EOS Mode BETA (aka V2) reactor/reactor-kafka#207
  3. As I said on the other issue in reactor-kafka - you need to explain exactly what use case you are trying to satisfy. There is an impedance mismatch between the projects so it's hard to come up with a general solution for all possible use cases.

@roger751
Copy link

I understand. Thank you!

@roger751
Copy link

roger751 commented Jan 3, 2022

Hi @garyrussell, I'm reading this thread again and I wonder if after the PR will be merged reactor/reactor-kafka#258 you plan of reactivating this current PR for a reactive @KafkaListener? As for your question about my use case, I think it's the most general and common one - I'm trying to read from Kafka in a Reactor based application, and I'm willing to use the most out of the great Spring Boot and Cloud features. Ideally, I would be able to do that by using Reactor Kafka with Spring Cloud Stream. A reactive @KafkaListener is just a code reducer and a much more comfortable way of writing code in my opinion.

@garyrussell
Copy link
Contributor Author

I don't see any "code reducing" here. Reactor Kafka is doing the same work as the listener container; the goal being to get records from the broker and pass them to user code. The listener container and consumer event loop are effectively doing the same thing.

KafkaReceiver.create(ro)
	.receive()
	.publishOn(Schedulers.single(), 1)
	.doOnNext(record -> {
            processIt(record);
	})
        ...
        .subscribe();

I am struggling to see the value add; a concrete example might help.

@garyrussell garyrussell removed this from the Backlog milestone Oct 20, 2022
@harishvashistha
Copy link

@garyrussell the above code work perfect as you suggested and working fine for me. But can you help with the example where reactor-kafka can concurrently subscribe from multiple partitions of the topic concurrently in the same service instance. I don't want to leave reactor kafka just in absence of this feature. May be I am not aware. Please help.

@garyrussell
Copy link
Contributor Author

@harishvashistha It is not clear what you mean; you can subscribe to multiple topics in the ReceiverOptions.subscription(). Kafka will assign as many partitions as possible to each consumer (with each consumer in the same group each getting different partitions). If there is only one consumer, it will get all partitions.

@harishvashistha
Copy link

@garyrussell I mean, I have this code which returns ServerSentEvent to SSE API over HTTP by fetching records from kafka. But this creates a single consumer to kafka topic with 20 partition:

public Flux<ServerSentEvent<String>> getServerSentEvents() {
        final KafkaReceiver<Key, ValueSse> kafkaReceiverFromLastOffset =  kafkaReceiverProvider.getObject();

        return kafkaReceiver.receive()
                .publishOn(Schedulers.fromExecutor(taskExecutor))
                .log()
                .doOnEach(receiverRecordSignal -> log.trace("message received, from queue: {} ", receiverRecordSignal.get()))
                .map(ConsumerRecord::value)
                .map(valueSse -> ServerSentEvent.builder().data(valueSse).build())
                .doOnError(e -> log.error("invalid payload received while processing kafka message ", e))
                .onErrorContinue((e, record) -> {
                    log.error(MESSAGE_PARSING_FAILED, e);
                    Flux.empty();
                });
    }
@Bean
@Scope("prototype")
public KafkaReceiver<Key, Value> kafkaReceiver() throws ExecutionException, InterruptedException {
        final String consumerId = UUID.randomUUID().toString();
        final Map<String, Object> propsMaps = kafkaReceiverConfigurations(consumerId);

        return new DefaultKafkaReceiver(ConsumerFactory.INSTANCE,
                ReceiverOptions.create(propsMaps)
                        .addAssignListener(receiverPartitions -> {
                            for (final ReceiverPartition p : receiverPartitions) {
                                p.seekToEnd();
                            }
                        })
                        .assignment(getTopicPartitions()));
    }

As you said above, this kafkaReceiver.receive() just creates a single kafka consumer for all the partitions of the kafka topic.

But how can I create multiple kafka consumers reading data from multiple partitions parallely as per my requirement. As I have to return this Flux of ServerSentEvent to the caller client over SSE API?

I have followed this thread as well shared in https://stackoverflow.com/questions/69891782/multi-threading-on-kafka-send-in-spring-reactor-kafka . But not sure how to use this code to return Flux of ServerSentEvent to calling API:

@Bean
    public ReceiverOptions<String, String> kafkaReceiverOptions(String topic, KafkaProperties kafkaProperties) {
        ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        return basicReceiverOptions.subscription(Collections.singletonList(topic))
                .addAssignListener(receiverPartitions -> log.debug("onPartitionAssigned {}", receiverPartitions))
                .addRevokeListener(receiverPartitions -> log.debug("onPartitionsRevoked {}", receiverPartitions));
    }

@Bean
public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
    return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
}

public void run(String... args) {
        for(int i = 0; i < topicPartitionsCount ; i++) {
            readWrite(destinationTopic).subscribe();
        }
    }}

public Flux<String> readWrite(String destTopic) {
        return kafkaConsumerTemplate
                .receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .map(ConsumerRecord::value)               
                .onErrorContinue((exception,errorConsumer)->{
                    log.error("Error while consuming : {}", exception.getMessage());
                });
    }

@garyrussell
Copy link
Contributor Author

You would need to create multiple receivers and merge their results into a single flux.

@harishvashistha
Copy link

harishvashistha commented Jan 9, 2023

@garyrussell are you hinting that something like this would be required?:

 public Flux<ServerSentEvent<String>> getSSEDataFromMultiplePartitions() {
        Flux<ServerSentEvent<String>> toReturn = Flux.empty();
        for (int i = 0; i < partitionCount; i++) {
            Flux<ServerSentEvent<String>> eventsFromDedicatedConnection = getServerSentEvents();
            toReturn.mergeWith(eventsFromDedicatedConnection);
        }
        return toReturn;
    }

As this is not returning any data to SSE channel at all.

public Flux<ServerSentEvent<String>> getServerSentEvents() {
        final KafkaReceiver<Key, ValueSse> kafkaReceiverFromLastOffset =  kafkaReceiverProvider.getObject();

        return kafkaReceiver.receive()
                .publishOn(Schedulers.fromExecutor(taskExecutor))
                .log()
                .doOnEach(receiverRecordSignal -> log.trace("message received, from queue: {} ", receiverRecordSignal.get()))
                .map(ConsumerRecord::value)
                .map(valueSse -> ServerSentEvent.builder().data(valueSse).build())
                .doOnError(e -> log.error("invalid payload received while processing kafka message ", e))
                .onErrorContinue((e, record) -> {
                    log.error(MESSAGE_PARSING_FAILED, e);
                    Flux.empty();
                });
    }
@Bean
@Scope("prototype")
public KafkaReceiver<Key, Value> kafkaReceiver() throws ExecutionException, InterruptedException {
        final String consumerId = UUID.randomUUID().toString();
        final Map<String, Object> propsMaps = kafkaReceiverConfigurations(consumerId);

        return new DefaultKafkaReceiver(ConsumerFactory.INSTANCE,
                ReceiverOptions.create(propsMaps)
                        .addAssignListener(receiverPartitions -> {
                            for (final ReceiverPartition p : receiverPartitions) {
                                p.seekToEnd();
                            }
                        })
                        .assignment(getTopicPartitions()));
    }

@garyrussell
Copy link
Contributor Author

Sorry; I am not a reactor expert; you'd be better off asking somewhere like stack overflow.

That said, you seem to be discarding the new flux created by mergeWith()...

toReturn.mergeWith(eventsFromDedicatedConnection);

Looks like it should be

toReturn = toReturn.mergeWith(eventsFromDedicatedConnection);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants